前言
在对数据量大的情况下,进行批量操作的时候,效率直接拉跨,同时还要考虑事务机制问题。那是否有既简单又通用的批处理思路呢?当然,多线程执行批量任务就是一种十分重要的操作思路
步骤
- 获取数据库连接,控制事务机制
- 对大集合进行拆分成N个小集合。
- 开启线程池,对小集合进行批量更新操作,并返回执行结果。
- 线程批量提交,返回Future对象,进而判断事务是否需要提交或回滚
实战
Service
package cn.goitman.service;
import com.google.common.collect.Lists; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
import java.sql.Connection; import java.sql.SQLException; import java.util.List; import java.util.concurrent.*; import java.util.function.BiFunction;
@Service public class ThreadPoolService {
@Autowired private SqlSessionFactory sqlSessionFactory;
private static final int splitSize = 1000;
public <T, U, R> void threadMethod(List<T> bigList, Class<U> mapperClass, BiFunction<List<T>, U, R> function) throws SQLException { long start = System.currentTimeMillis();
SqlSession sqlSession = sqlSessionFactory.openSession(); Connection connection = sqlSession.getConnection();
int corePoolSize = Runtime.getRuntime().availableProcessors() + 1; int maximumPoolSize = corePoolSize * 2; int capacity = (int) Math.floor(maximumPoolSize / 2);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(capacity), new ThreadPoolExecutor.AbortPolicy());
List<List<T>> resultList = Lists.partition(bigList, splitSize);
List<Callable<Integer>> callList = Lists.newArrayList();
try { connection.setAutoCommit(false); U mapper = sqlSession.getMapper(mapperClass); for (List<T> parList : resultList) { Callable<Integer> callable = new Callable<Integer>() { @Override public Integer call() throws Exception { List<T> updateList = Lists.newArrayList(); for (T t : parList) { updateList.add(t); } return (Integer) function.apply(updateList, mapper); } }; callList.add(callable); }
List<Future<Integer>> futures = threadPool.invokeAll(callList); for (Future<Integer> future : futures) { if (future.get() <= 0) { connection.rollback(); return; } }
connection.commit(); System.out.println(bigList.size() + " 条数据耗时:" + (System.currentTimeMillis() - start) / 1000 + " s"); } catch (Exception e) { connection.rollback(); System.out.println("异常耗时:" + (System.currentTimeMillis() - start) / 1000 + " s"); e.printStackTrace(); } } }
|
测试
没写TEST测试,直接用了个预加载来模拟数据的批处理
package cn.goitman.commandrunner;
import cn.goitman.mapper.BallDao; import cn.goitman.pojo.Period; import cn.goitman.service.ThreadPoolService; import com.google.common.collect.Lists; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component;
import java.util.List;
@Component @Order(0) public class StartRunner implements CommandLineRunner {
@Autowired private ThreadPoolService threadPoolService;
@Override public void run(String... args) throws Exception { List<Period> list = Lists.newArrayList();
int size = 10001; for (int i = 1; i < size; i++) { Period period = new Period(i + "", i + "", i + "", i + "", i + "", i + "", i + ""); list.add(period); }
threadPoolService.threadMethod(list, BallDao.class, (data, BallDao) -> BallDao.insertPeriod(data));
} }
|
后语
毕竟Callable可以返回结果,也可以抛出异常供后续处理,自然效率上要比Runnable差那么一丢丢,但好处实在是太多太多。多线程耍得起飞的人,人生都开启了多线程模式……
源码:https://github.com/wangdaicong/spring-boot-project/tree/master/generalTool-demo